{
  "cells": [
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "# Set Up Ais.Net Receiver"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "dotnet_interactive": {
          "language": "csharp"
        }
      },
      "outputs": [],
      "source": [
        "#r \"nuget:Ais.Net.Receiver\"\r\n",
        "#r \"nuget:Ais.Net.Models\""
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "dotnet_interactive": {
          "language": "csharp"
        }
      },
      "outputs": [],
      "source": [
        "using Ais.Net.Models;\r\n",
        "using Ais.Net.Models.Abstractions;\r\n",
        "using Ais.Net.Receiver.Configuration;\r\n",
        "using Ais.Net.Receiver.Receiver;\r\n",
        "\r\n",
        "static ReceiverHost CreateReceiverHost() =>\r\n",
        "    new(new NetworkStreamNmeaReceiver(host: \"153.44.253.27\", port: 5631, retryAttemptLimit: 100, retryPeriodicity: TimeSpan.Parse(\"00:00:00:00.500\")));"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "# Set up Bing Maps"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "dotnet_interactive": {
          "language": "javascript"
        }
      },
      "outputs": [],
      "source": [
        "interactive.registerCommandHandler({commandType: 'VesselPositionCommand', handle: c => {\r\n",
        "    UpdateVesselPosition(c.command);\r\n",
        "}});"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "> **Note**: In the cell below, replace `<INSERT_CREDENTIALS_HERE>` by your own Bing Maps access key obtained from the [Bing Maps Dev Center](https://www.bingmapsportal.com)."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "dotnet_interactive": {
          "language": "html"
        }
      },
      "outputs": [],
      "source": [
        "<!DOCTYPE html>\r\n",
        "<html>\r\n",
        "<head>\r\n",
        "    <meta charset=\"utf-8\" />\r\n",
        "    <script type='text/javascript' src='http://www.bing.com/api/maps/mapcontrol?callback=GetMap' async defer></script>\r\n",
        "    <script type='text/javascript'>\r\n",
        "        var map;\r\n",
        "\r\n",
        "        function UpdateVesselPosition(position) {\r\n",
        "            var pin = getPushpinById(position.mmsi);\r\n",
        "            var loc = new Microsoft.Maps.Location(position.lat, position.lon);\r\n",
        "\r\n",
        "            if (pin == null)\r\n",
        "            {\r\n",
        "                var pin = new Microsoft.Maps.Pushpin(loc, {\r\n",
        "                                icon: createRedArrow(position.courseOverGroundDegrees),\r\n",
        "                                title: position.name,\r\n",
        "                                subTitle: position.mmsi\r\n",
        "                            });\r\n",
        "\r\n",
        "                pin.metadata = { id: position.mmsi };\r\n",
        "                map.entities.push(pin);\r\n",
        "            }\r\n",
        "            else\r\n",
        "            {\r\n",
        "                pin.setLocation(loc);\r\n",
        "            }\r\n",
        "        }\r\n",
        "\r\n",
        "        function GetMap() {\r\n",
        "            map = new Microsoft.Maps.Map('#myMap', {\r\n",
        "                credentials: \"<INSERT_CREDENTIALS_HERE>\"\r\n",
        "            });\r\n",
        "\r\n",
        "            // Center around Norway and zoom in sufficiently.\r\n",
        "            map.setView({ center: new Microsoft.Maps.Location(67, 15), zoom: 4 });\r\n",
        "        }\r\n",
        "\r\n",
        "        function createRedArrow(heading) {\r\n",
        "            var c = document.createElement('canvas'); c.width = 24; c.height = 24;\r\n",
        "            var ctx = c.getContext('2d');\r\n",
        "\r\n",
        "            // Offset the canvas such that we will rotate around the center of our arrow.\r\n",
        "            ctx.translate(c.width * 0.5, c.height * 0.5);\r\n",
        "\r\n",
        "            // Rotate the canvas by the desired heading.\r\n",
        "            ctx.rotate(heading * Math.PI / 180);\r\n",
        "\r\n",
        "            // Return the canvas offset back to it's original position.\r\n",
        "            ctx.translate(-c.width * 0.5, -c.height * 0.5);\r\n",
        "\r\n",
        "            // Draw a path in the shape of an arrow.\r\n",
        "            ctx.fillStyle = '#f00';\r\n",
        "            ctx.beginPath();\r\n",
        "            ctx.moveTo(12, 0); ctx.lineTo(5, 20); ctx.lineTo(12, 15); ctx.lineTo(19, 20); ctx.lineTo(12, 0);\r\n",
        "            ctx.closePath();\r\n",
        "            ctx.fill();\r\n",
        "            ctx.stroke();\r\n",
        "\r\n",
        "            // Generate the base64 image URL from the canvas.\r\n",
        "            return c.toDataURL();\r\n",
        "        }\r\n",
        "\r\n",
        "        function getPushpinById(id) \r\n",
        "        {\r\n",
        "            console.log(\"find \" + id);\r\n",
        "            var pin;\r\n",
        "            for (i = 0; i < map.entities.getLength(); i++) {\r\n",
        "                pin = map.entities.get(i);\r\n",
        "                if (pin.metadata && pin.metadata.id === id) {\r\n",
        "                    return pin;\r\n",
        "                }\r\n",
        "            }\r\n",
        "        }\r\n",
        "    </script>\r\n",
        "    <style>\r\n",
        "        #myMap {\r\n",
        "            position: relative;\r\n",
        "            width: 100%;\r\n",
        "            height: 800px;\r\n",
        "        }\r\n",
        "    </style>\r\n",
        "</head>\r\n",
        "<body>\r\n",
        "    <div id=\"myMap\"></div>\r\n",
        "</body>\r\n",
        "</html>"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "# Set up Reaqtor"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "dotnet_interactive": {
          "language": "csharp"
        }
      },
      "outputs": [],
      "source": [
        "#r \"bin/Debug/net5.0/Reaqtor.Shebang.App.dll\""
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Utilities to work with query engines"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "dotnet_interactive": {
          "language": "csharp"
        }
      },
      "outputs": [],
      "source": [
        "using System.IO;\r\n",
        "\r\n",
        "using Reaqtor.Shebang.App;\r\n",
        "using Reaqtor.Shebang.Client;\r\n",
        "using Reaqtor.Shebang.Linq;\r\n",
        "using Reaqtor.Shebang.Service;\r\n",
        "\r\n",
        "async Task WithNewEngine(IQueryEngineStateStore store, IIngressEgressManager iemgr, Func<ClientContext, Task> action)\r\n",
        "{\r\n",
        "    Console.WriteLine($\"{DateTime.Now.ToString(\"yyyy-MM-dd HH:mm:ss\")} Creating engine...\");\r\n",
        "\r\n",
        "    var qe = await QueryEngineFactory.CreateNewAsync(store, ingressEgressManager: iemgr);\r\n",
        "    var ctx = qe.Client;\r\n",
        "\r\n",
        "    Console.WriteLine($\"{DateTime.Now.ToString(\"yyyy-MM-dd HH:mm:ss\")} Invoking user code...\");\r\n",
        "\r\n",
        "    await action(ctx);\r\n",
        "\r\n",
        "    Console.WriteLine($\"{DateTime.Now.ToString(\"yyyy-MM-dd HH:mm:ss\")} Checkpointing engine...\");\r\n",
        "\r\n",
        "    await qe.CheckpointAsync();\r\n",
        "\r\n",
        "    Console.WriteLine($\"{DateTime.Now.ToString(\"yyyy-MM-dd HH:mm:ss\")} Unloading engine...\");\r\n",
        "\r\n",
        "    await qe.UnloadAsync();\r\n",
        "\r\n",
        "    Console.WriteLine($\"{DateTime.Now.ToString(\"yyyy-MM-dd HH:mm:ss\")} Engine unloaded.\");\r\n",
        "}\r\n",
        "\r\n",
        "async Task WithExistingEngine(IQueryEngineStateStore store, IIngressEgressManager iemgr, Func<ClientContext, Task> action)\r\n",
        "{\r\n",
        "    Console.WriteLine($\"{DateTime.Now.ToString(\"yyyy-MM-dd HH:mm:ss\")} Recovering engine...\");\r\n",
        "\r\n",
        "    var qe = await QueryEngineFactory.RecoverAsync(store, ingressEgressManager: iemgr);\r\n",
        "    var ctx = qe.Client;\r\n",
        "\r\n",
        "    Console.WriteLine($\"{DateTime.Now.ToString(\"yyyy-MM-dd HH:mm:ss\")} Invoking user code...\");\r\n",
        "\r\n",
        "    await action(ctx);\r\n",
        "\r\n",
        "    Console.WriteLine($\"{DateTime.Now.ToString(\"yyyy-MM-dd HH:mm:ss\")} Checkpointing engine...\");\r\n",
        "\r\n",
        "    await qe.CheckpointAsync();\r\n",
        "\r\n",
        "    Console.WriteLine($\"{DateTime.Now.ToString(\"yyyy-MM-dd HH:mm:ss\")} Unloading engine...\");\r\n",
        "\r\n",
        "    await qe.UnloadAsync();\r\n",
        "\r\n",
        "    Console.WriteLine($\"{DateTime.Now.ToString(\"yyyy-MM-dd HH:mm:ss\")} Engine unloaded.\");\r\n",
        "}\r\n",
        "\r\n",
        "string SaveStore(InMemoryKeyValueStore store)\r\n",
        "{\r\n",
        "    var file = Path.Combine(Environment.CurrentDirectory, $\"store_{Environment.TickCount64}.txt\");\r\n",
        "\r\n",
        "    store.Save(file);\r\n",
        "\r\n",
        "    Console.WriteLine($\"Stored dumped to {file}. Size = {new FileInfo(file).Length} bytes\");\r\n",
        "\r\n",
        "    return file;\r\n",
        "}"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### (Optional) Test the setup"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "dotnet_interactive": {
          "language": "csharp"
        }
      },
      "outputs": [],
      "source": [
        "var store = new InMemoryKeyValueStore();\r\n",
        "var iemgr = new IngressEgressManager();\r\n",
        "\r\n",
        "await WithNewEngine(store, iemgr, async ctx =>\r\n",
        "{\r\n",
        "    Console.WriteLine($\"{DateTime.Now.ToString(\"yyyy-MM-dd HH:mm:ss\")} Create trivial subscription to test Reaqtor setup.\");\r\n",
        "\r\n",
        "    await ctx.SimpleTimer(TimeSpan.FromSeconds(1)).Select((_, i) => i.ToString()).OfType<string>().Cast<string>().SubscribeAsync(ctx.ConsoleOut, new Uri(\"test://sub/\" + Environment.TickCount64));\r\n",
        "\r\n",
        "    Console.WriteLine($\"{DateTime.Now.ToString(\"yyyy-MM-dd HH:mm:ss\")} Wait a bit to see events flowing.\");\r\n",
        "\r\n",
        "    await Task.Delay(TimeSpan.FromSeconds(5));\r\n",
        "\r\n",
        "    Console.WriteLine($\"{DateTime.Now.ToString(\"yyyy-MM-dd HH:mm:ss\")} Done.\");\r\n",
        "});"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "dotnet_interactive": {
          "language": "csharp"
        }
      },
      "outputs": [],
      "source": [
        "string file = SaveStore(store);"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "dotnet_interactive": {
          "language": "csharp"
        }
      },
      "outputs": [],
      "source": [
        "var store = InMemoryKeyValueStore.Load(file);\r\n",
        "var iemgr = new IngressEgressManager();\r\n",
        "\r\n",
        "await WithExistingEngine(store, iemgr, async ctx =>\r\n",
        "{\r\n",
        "    Console.WriteLine($\"{DateTime.Now.ToString(\"yyyy-MM-dd HH:mm:ss\")} Wait a bit to see events flowing. (NB: There are known issues with Notebooks infra that prevent console output here.)\");\r\n",
        "\r\n",
        "    await Task.Delay(TimeSpan.FromSeconds(5));\r\n",
        "\r\n",
        "    Console.WriteLine($\"{DateTime.Now.ToString(\"yyyy-MM-dd HH:mm:ss\")} Done.\");\r\n",
        "});"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Bridge Reaqtor with Ais and kernel"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Get the client kernel\r\n",
        "\r\n",
        "We'll use the client-side kernel object later to send events through an observer abstraction."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "dotnet_interactive": {
          "language": "csharp"
        }
      },
      "outputs": [],
      "source": [
        "using Microsoft.DotNet.Interactive;\r\n",
        "using Microsoft.DotNet.Interactive.Commands;\r\n",
        "\r\n",
        "var jsKernel = Kernel.Current.FindKernel(\"javascript\");"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Define data model types\r\n",
        "\r\n",
        "The Reaqtor configuration provided by the Shebang sample stack uses the Nuqleon Data Model to represent entities. While the query operators are functional with arbitrary event types, checkpointing requires the ability to serialize and deserialize events, e.g. if they're stored in the state of `CombineLatest` (to represent the latest values received on the different inputs). The current AIS data types use interfaces, which cannot be deserialized (because the serializer doesn't know which concrete implementation to instantiate).\r\n",
        "\r\n",
        "**Note:** Serialization is pluggable in the Reaqtor query engine, so one could come up with a sophisticated serializer mechanism that supports interfaces and knows which concrete type to instantiate upon deserialization. This could allow AIS events to still be represented using interface implementations that are simple wrappers around underlying state (with minimal allocation overhead, using `Span<T>` etc. underneath), but a second implementation would have to be added to support deserialization (coming from a different underlying representation, e.g. JSON). Note that the Nuqleon Data Model has the advantage that once the queries are formulated using it, these queries and their state are completely portable across machines, and events are exchangable across machines, without having to rely on assemblies or types being deployed. The structural definition of these entity types is captured in checkpoint state (rather than a nominal definition, i.e. referring to a type in an assembly, by name)."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "dotnet_interactive": {
          "language": "csharp"
        }
      },
      "outputs": [],
      "source": [
        "using Nuqleon.DataModel;\r\n",
        "\r\n",
        "public class VesselName\r\n",
        "{\r\n",
        "    [Mapping(\"ais://vessel_name/name\")]\r\n",
        "    public string Name { get; set; }\r\n",
        "}\r\n",
        "\r\n",
        "public class VesselNavigation\r\n",
        "{\r\n",
        "    [Mapping(\"ais://vessel_nav/position\")]\r\n",
        "    public Coordinates Position { get; set; }\r\n",
        "\r\n",
        "    [Mapping(\"ais://vessel_nav/cogdegrees\")]\r\n",
        "    public float? CourseOverGroundDegrees { get; set; }\r\n",
        "}\r\n",
        "\r\n",
        "public class Coordinates\r\n",
        "{\r\n",
        "    [Mapping(\"ais://coordinates/lat\")]\r\n",
        "    public double Latitude { get; set; }\r\n",
        "\r\n",
        "    [Mapping(\"ais://coordinates/long\")]\r\n",
        "    public double Longitude { get; set; }\r\n",
        "}\r\n",
        "\r\n",
        "public enum MessageType\r\n",
        "{\r\n",
        "    [Mapping(\"ais://messagetype/unknown\")]\r\n",
        "    Unknown,\r\n",
        "\r\n",
        "    [Mapping(\"ais://messagetype/name\")]\r\n",
        "    Name,\r\n",
        "\r\n",
        "    [Mapping(\"ais://messagetype/navigation\")]\r\n",
        "    Navigation,\r\n",
        "}\r\n",
        "\r\n",
        "public class AisMessage\r\n",
        "{\r\n",
        "    [Mapping(\"ais://msg/type\")]\r\n",
        "    public MessageType Type { get; set; }\r\n",
        "\r\n",
        "    [Mapping(\"ais://msg/name\")]\r\n",
        "    public VesselName Name { get; set; }\r\n",
        "\r\n",
        "    [Mapping(\"ais://msg/nav\")]\r\n",
        "    public VesselNavigation Navigation { get; set; }\r\n",
        "\r\n",
        "    [Mapping(\"ais://msg/mmsi\")]\r\n",
        "    public uint Mmsi { get; set; }\r\n",
        "}\r\n",
        "\r\n",
        "public class VesselPosition\r\n",
        "{\r\n",
        "    [Mapping(\"reaqtor://demo/vessel_tracking/lon\")]\r\n",
        "    public double Lon { get; set; }\r\n",
        "\r\n",
        "    [Mapping(\"reaqtor://demo/vessel_tracking/lat\")]\r\n",
        "    public double Lat { get; set; }\r\n",
        "\r\n",
        "    [Mapping(\"reaqtor://demo/vessel_tracking/cogdegrees\")]\r\n",
        "    public float CourseOverGroundDegrees { get; set; }\r\n",
        "\r\n",
        "    [Mapping(\"reaqtor://demo/vessel_tracking/mmsi\")]\r\n",
        "    public string Mmsi { get; set; }\r\n",
        "\r\n",
        "    [Mapping(\"reaqtor://demo/vessel_tracking/name\")]\r\n",
        "    public string Name { get; set; }\r\n",
        "}"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Define the `VesselPositionCommand`\r\n",
        "\r\n",
        "Goal is to have an `IObserver<KernelCommand>` that calls the kernel with the given command instance. Note that the properties in this object are accessed on the JavaScript side in the client-side kernel, so property names should not be altered."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "dotnet_interactive": {
          "language": "csharp"
        }
      },
      "outputs": [],
      "source": [
        "public class VesselPositionCommand : KernelCommand\r\n",
        "{\r\n",
        "    public VesselPositionCommand() : base(\"javascript\") {}\r\n",
        "    public double Lon { get; set; }\r\n",
        "    public double Lat { get; set; }\r\n",
        "    public float CourseOverGroundDegrees { get; set; }\r\n",
        "    public string Mmsi { get; set; }\r\n",
        "    public string Name { get; set; }\r\n",
        "}\r\n",
        "\r\n",
        "jsKernel.RegisterCommandType<VesselPositionCommand>();"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Build a classic `IObserver<T>` for kernel commands\r\n",
        "\r\n",
        "The use of an observer will make it easier to write an Rx query that connects the egress subject coming out of Reaqtor to the client-side kernel."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "dotnet_interactive": {
          "language": "csharp"
        }
      },
      "outputs": [],
      "source": [
        "class KernelCommandObserver : IObserver<KernelCommand>\r\n",
        "{\r\n",
        "    private readonly Kernel _kernel;\r\n",
        "\r\n",
        "    public KernelCommandObserver(Kernel kernel) => _kernel = kernel;\r\n",
        "\r\n",
        "    public async void OnNext(KernelCommand value)\r\n",
        "    {\r\n",
        "        // REVIEW: Fire-and-forget; never checks the result; doesn't support cancellation.\r\n",
        "        _ = await _kernel.SendAsync(value);\r\n",
        "    }\r\n",
        "\r\n",
        "    public void OnError(Exception error) {}\r\n",
        "    public void OnCompleted() {}\r\n",
        "}"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Define ingress and egress\r\n",
        "\r\n",
        "We want to send `IAisMessage`s into the query running in the engine, and receive `VesselPositionCommand`s out of the query. To do so, we'll allocate subjects in the `IngressEgressManager`. These will be wired up later:\r\n",
        "\r\n",
        "* inside the query engine by simply writing a query expression that uses `GetIngress<T>(string)` and `GetEgress<T>(string)`;\r\n",
        "* outside the query engine by using Rx queries that connect the AIS receiver to the ingress stream, and the egress stream to the kernel command observer."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "dotnet_interactive": {
          "language": "csharp"
        }
      },
      "outputs": [],
      "source": [
        "var iemgr = new IngressEgressManager();\r\n",
        "\r\n",
        "var ais_data = \"aisdata\";\r\n",
        "var vessel_positions = \"vesselpositions\";\r\n",
        "\r\n",
        "var ais_publisher = iemgr.CreateSubject<AisMessage>(ais_data);\r\n",
        "var vessel_position_consumer = iemgr.CreateSubject<VesselPosition>(vessel_positions);"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Add a helper to collect statistics\r\n",
        "\r\n",
        "In order to keep track of the number of events received by the engine and emitted by the engine, we'll define a little helper. We'll do the counting in `Do` operators in the queries on the outside of the query engine, thus counting what goes in and what comes out of the query running in the engine."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "dotnet_interactive": {
          "language": "csharp"
        }
      },
      "outputs": [],
      "source": [
        "using System.Threading;\r\n",
        "\r\n",
        "class Stats\r\n",
        "{\r\n",
        "    private volatile int _rx;\r\n",
        "    private volatile int _tx;\r\n",
        "    private volatile int _groups;\r\n",
        "\r\n",
        "    public int ReceiveCount => _rx;\r\n",
        "    public int SendCount => _tx;\r\n",
        "    public int GroupCount => _groups;\r\n",
        "\r\n",
        "    public void OnReceived() => Interlocked.Increment(ref _rx);\r\n",
        "    public void OnSend() => Interlocked.Increment(ref _tx);\r\n",
        "    public void OnNewGroup() => Interlocked.Increment(ref _groups);\r\n",
        "\r\n",
        "    public void Reset() => (_rx, _tx, _groups) = (0, 0, 0);\r\n",
        "\r\n",
        "    public override string ToString() => $\"Received = {ReceiveCount}  Groups = {GroupCount}  Sent = {SendCount}\";\r\n",
        "}\r\n",
        "\r\n",
        "var stats = new Stats();"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Hook up the input side to receive AIS data\r\n",
        "\r\n",
        "Events don't flow until we call `receiverHost.StartAsync`, so we wrap the whole setup in a helper method we'll call later. First, we wire the `receiverHost.Messages` observable to the `ais_publisher`. To do so, we need to introduce sequence numbers because we're feeding into an `IReliableObserver<IAisMessage>`. We'll just generate increasing numbers obtained using `ToFileTime()` (and assume monotonicity, i.e. nobody is changing the system clock). A real implementation that consumes external events would support replay from the external producer here (e.g. EventHub)."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "dotnet_interactive": {
          "language": "csharp"
        }
      },
      "outputs": [],
      "source": [
        "using System.Reactive.Disposables;\r\n",
        "using System.Reactive.Linq;\r\n",
        "\r\n",
        "static IObservable<AisMessage> ConvertToDataModel(this IObservable<IAisMessage> source)\r\n",
        "{\r\n",
        "    return from msg in source\r\n",
        "           where msg is (IVesselName or IVesselNavigation)\r\n",
        "           select msg switch\r\n",
        "           {\r\n",
        "               IVesselName n =>\r\n",
        "                  new AisMessage\r\n",
        "                  {\r\n",
        "                      Type = MessageType.Name,\r\n",
        "                      Mmsi = msg.Mmsi,\r\n",
        "                      Name = new VesselName()\r\n",
        "                      {\r\n",
        "                          Name = n.VesselName\r\n",
        "                      }\r\n",
        "                  },\r\n",
        "               IVesselNavigation n =>\r\n",
        "                  new AisMessage\r\n",
        "                  {\r\n",
        "                      Type = MessageType.Navigation,\r\n",
        "                      Mmsi = msg.Mmsi,\r\n",
        "                      Navigation = new VesselNavigation()\r\n",
        "                      {\r\n",
        "                          CourseOverGroundDegrees = n.CourseOverGroundDegrees,\r\n",
        "                          Position = n.Position switch\r\n",
        "                          {\r\n",
        "                              Position p => new Coordinates() { Latitude = p.Latitude, Longitude = p.Longitude },\r\n",
        "                              null => null\r\n",
        "                          }\r\n",
        "                      }\r\n",
        "                  },\r\n",
        "               _ => null\r\n",
        "           };\r\n",
        "}\r\n",
        "\r\n",
        "IDisposable ReceiveEventsFromAis()\r\n",
        "{\r\n",
        "    var receiverHost = CreateReceiverHost();\r\n",
        "\r\n",
        "    var ais_publisher_subscription =\r\n",
        "        receiverHost.Messages\r\n",
        "            .Do(_ => stats.OnReceived())\r\n",
        "            .ConvertToDataModel()\r\n",
        "            .Select(msg => (sequenceId: (long)DateTimeOffset.UtcNow.ToFileTime(), item: msg))\r\n",
        "            .Subscribe(ais_publisher);\r\n",
        "\r\n",
        "    var group_count_subscription =\r\n",
        "        receiverHost.Messages\r\n",
        "            .GroupBy(msg => msg.Mmsi)\r\n",
        "            .Do(_ => stats.OnNewGroup())\r\n",
        "            .Subscribe(_ => {});\r\n",
        "\r\n",
        "    var cancel = new CancellationDisposable();\r\n",
        "\r\n",
        "    var t = receiverHost.StartAsync(cancel.Token);\r\n",
        "\r\n",
        "    return StableCompositeDisposable.Create(\r\n",
        "        cancel,\r\n",
        "        Disposable.Create(() =>\r\n",
        "        {\r\n",
        "            // NB: Wait for completion after cancellation.\r\n",
        "            try { t.GetAwaiter().GetResult(); }\r\n",
        "            catch (OperationCanceledException) {}\r\n",
        "        }),\r\n",
        "        ais_publisher_subscription,\r\n",
        "        group_count_subscription);\r\n",
        "}"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Hook up the output side to send client kernel commands\r\n",
        "\r\n",
        "To send commands to the kernel, we'll use our `KernelCommandObserver` and hook it up to the `vessel_position_consumer`. This time we need to shake off sequence numbers emitted by the query engine."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "dotnet_interactive": {
          "language": "csharp"
        }
      },
      "outputs": [],
      "source": [
        "using System.Reactive.Concurrency;\r\n",
        "\r\n",
        "IDisposable SendVesselPositionsToClientKernel()\r\n",
        "{\r\n",
        "    //\r\n",
        "    // NB: The use of the `NewThreadScheduler` is to work around some Notebook threading woes. Every time we\r\n",
        "    //     run the helper method here, it will cause new thread creation and establish a proper client kernel\r\n",
        "    //     connection. We'll use this helper method within a single cell later.\r\n",
        "    //\r\n",
        "\r\n",
        "    var jsKernel = Kernel.Current.FindKernel(\"javascript\");\r\n",
        "\r\n",
        "    return vessel_position_consumer\r\n",
        "        .Do(_ => stats.OnSend())\r\n",
        "        .Select(t => new VesselPositionCommand { Mmsi = t.item.Mmsi, Name = t.item.Name, Lat = t.item.Lat, Lon = t.item.Lon, CourseOverGroundDegrees = t.item.CourseOverGroundDegrees })\r\n",
        "        .ObserveOn(NewThreadScheduler.Default)\r\n",
        "        .Subscribe(new KernelCommandObserver(jsKernel));\r\n",
        "}"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Run the query in Reaqtor\r\n",
        "\r\n",
        "Finally, we can run the query in Reaqtor. We'll create the query first, checkpoint the engine, and shut it down. While we could immediately start to process events after creating the subscription, we'll take a first checkpoint to allow for exploration of the state store prior to flowing events."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "dotnet_interactive": {
          "language": "csharp"
        }
      },
      "outputs": [],
      "source": [
        "using System.Diagnostics;\r\n",
        "\r\n",
        "var subUri = new Uri(\"test://sub/\" + Environment.TickCount64);\r\n",
        "\r\n",
        "var store = new InMemoryKeyValueStore();\r\n",
        "\r\n",
        "await WithNewEngine(store, iemgr, async ctx =>\r\n",
        "{\r\n",
        "    var source = ctx.GetIngress<AisMessage>(ais_data);\r\n",
        "    var sink = ctx.GetEgress<VesselPosition>(vessel_positions);\r\n",
        "\r\n",
        "    var byVessel = source.GroupBy(m => m.Mmsi);\r\n",
        "\r\n",
        "    var vesselNavigationWithNameStream =\r\n",
        "        from perVesselMessages in byVessel\r\n",
        "        from vesselLocationAndName in\r\n",
        "                (from msg in perVesselMessages\r\n",
        "                 where msg.Type == MessageType.Navigation\r\n",
        "                 select msg.Navigation)\r\n",
        "            .CombineLatest(\r\n",
        "                (from msg in perVesselMessages\r\n",
        "                 where msg.Type == MessageType.Name\r\n",
        "                 select msg.Name),\r\n",
        "            (navigation, name) => new { navigation, name })\r\n",
        "        select\r\n",
        "            new VesselPosition\r\n",
        "            {\r\n",
        "                Lat = vesselLocationAndName.navigation.Position.Latitude,\r\n",
        "                Lon = vesselLocationAndName.navigation.Position.Longitude,\r\n",
        "                CourseOverGroundDegrees = vesselLocationAndName.navigation.CourseOverGroundDegrees ?? 0,\r\n",
        "                Mmsi = perVesselMessages.Key.ToString(),\r\n",
        "                Name = vesselLocationAndName.name.Name.CleanVesselName()\r\n",
        "            };\r\n",
        "\r\n",
        "    Console.WriteLine($\"{DateTime.Now.ToString(\"yyyy-MM-dd HH:mm:ss\")} Creating subscription...\");\r\n",
        "\r\n",
        "    await vesselNavigationWithNameStream.SubscribeAsync(sink, subUri);\r\n",
        "\r\n",
        "    Console.WriteLine($\"{DateTime.Now.ToString(\"yyyy-MM-dd HH:mm:ss\")} Shutting down...\");\r\n",
        "});"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "dotnet_interactive": {
          "language": "csharp"
        }
      },
      "outputs": [],
      "source": [
        "SaveStore(store);"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "dotnet_interactive": {
          "language": "csharp"
        }
      },
      "outputs": [],
      "source": [
        "using System.Diagnostics;\r\n",
        "\r\n",
        "await WithExistingEngine(store, iemgr, async ctx =>\r\n",
        "{\r\n",
        "    Console.WriteLine($\"{DateTime.Now.ToString(\"yyyy-MM-dd HH:mm:ss\")} Connecting egress to client kernel...\");\r\n",
        "\r\n",
        "    using var receiverSubscription = SendVesselPositionsToClientKernel();\r\n",
        "\r\n",
        "    Console.WriteLine($\"{DateTime.Now.ToString(\"yyyy-MM-dd HH:mm:ss\")} Starting AIS event receiver...\");\r\n",
        "\r\n",
        "    using var publisherSubscription = ReceiveEventsFromAis();\r\n",
        "\r\n",
        "    Console.WriteLine($\"{DateTime.Now.ToString(\"yyyy-MM-dd HH:mm:ss\")} Events are being processed...\");\r\n",
        "\r\n",
        "    var sw = Stopwatch.StartNew();\r\n",
        "\r\n",
        "    while (sw.Elapsed < TimeSpan.FromSeconds(10))\r\n",
        "    {\r\n",
        "        await Task.Delay(TimeSpan.FromSeconds(1));\r\n",
        "\r\n",
        "        Console.WriteLine($\"{DateTime.Now.ToString(\"yyyy-MM-dd HH:mm:ss\")} {stats}\");\r\n",
        "    }\r\n",
        "\r\n",
        "    Console.WriteLine($\"{DateTime.Now.ToString(\"yyyy-MM-dd HH:mm:ss\")} Shutting down...\");\r\n",
        "});"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "dotnet_interactive": {
          "language": "csharp"
        }
      },
      "outputs": [],
      "source": [
        "var file = SaveStore(store);"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Recover the engine from state\r\n",
        "\r\n",
        "We'll reload the store from the file on disk, and then recover the engine from it."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "dotnet_interactive": {
          "language": "csharp"
        }
      },
      "outputs": [],
      "source": [
        "var store = InMemoryKeyValueStore.Load(file);"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "dotnet_interactive": {
          "language": "csharp"
        }
      },
      "outputs": [],
      "source": [
        "using System.Diagnostics;\r\n",
        "\r\n",
        "stats.Reset();\r\n",
        "\r\n",
        "await WithExistingEngine(store, iemgr, async ctx =>\r\n",
        "{\r\n",
        "    Console.WriteLine($\"{DateTime.Now.ToString(\"yyyy-MM-dd HH:mm:ss\")} Connecting egress to client kernel...\");\r\n",
        "\r\n",
        "    using var receiverSubscription = SendVesselPositionsToClientKernel();\r\n",
        "\r\n",
        "    Console.WriteLine($\"{DateTime.Now.ToString(\"yyyy-MM-dd HH:mm:ss\")} Starting AIS event receiver...\");\r\n",
        "\r\n",
        "    using var publisherSubscription = ReceiveEventsFromAis();\r\n",
        "\r\n",
        "    Console.WriteLine($\"{DateTime.Now.ToString(\"yyyy-MM-dd HH:mm:ss\")} Events are being processed...\");\r\n",
        "\r\n",
        "    var sw = Stopwatch.StartNew();\r\n",
        "\r\n",
        "    while (sw.Elapsed < TimeSpan.FromSeconds(10))\r\n",
        "    {\r\n",
        "        await Task.Delay(TimeSpan.FromSeconds(1));\r\n",
        "\r\n",
        "        Console.WriteLine($\"{DateTime.Now.ToString(\"yyyy-MM-dd HH:mm:ss\")} {stats}\");\r\n",
        "    }\r\n",
        "\r\n",
        "    Console.WriteLine($\"{DateTime.Now.ToString(\"yyyy-MM-dd HH:mm:ss\")} Shutting down...\");\r\n",
        "});"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "dotnet_interactive": {
          "language": "csharp"
        }
      },
      "outputs": [],
      "source": [
        "SaveStore(store);"
      ]
    }
  ],
  "metadata": {
    "kernelspec": {
      "display_name": ".NET (C#)",
      "language": "C#",
      "name": ".net-csharp"
    },
    "language_info": {
      "file_extension": ".cs",
      "mimetype": "text/x-csharp",
      "name": "C#",
      "pygments_lexer": "csharp",
      "version": "9.0"
    }
  },
  "nbformat": 4,
  "nbformat_minor": 4
}